package org.budo.warehouse.logic.producer.canal;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.Resource;

import org.budo.canal.message.handler.DefaultCanalMessageHandler;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.DataMessagePojo;
import org.budo.warehouse.logic.util.DataEntryUtil;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Getter
@Setter
@Slf4j
public class WarehouseCanalMessageHandler extends DefaultCanalMessageHandler {
    @Resource(name = "dispatcherDataConsumer")
    private DataConsumer dataConsumer;

    private Integer dataNodeId;

    @Override
    protected void handleEntries(Long messageId, List<Entry> entries) {
        List<DataEntry> dataEntries = new ArrayList<DataEntry>();
        for (Entry entry : entries) {
            CanalDataEntry canalDataEntry = new CanalDataEntry(entry);
            dataEntries.add(canalDataEntry);

            if (log.isDebugEnabled()) {
                log.debug("#39 canalDataEntry={}", DataEntryUtil.toSimpleString(canalDataEntry));
            }
        }

        DataMessage dataMessage = new DataMessagePojo(messageId, this.getDataNodeId(), dataEntries);
        dataConsumer.consume(dataMessage);
    }
}